package com.example.controller;

import com.example.entity.LogInfo;
import com.example.util.InfluxDbUtils;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * <p>
 * </p>
 *
 * @author:许腾飞
 * @since:2020/1/16 16:17
 */
@RestController
@RequestMapping("/test")
public class testController {

    @Autowired
    private InfluxDbUtils influxDbUtils;

    //先检查application.properties里的数据库连接
    //default 保存策略，保存天数：30天，保存副本数量：1

    @PostMapping("/writePoint")
    public void writePoint(){

        //选择时序数据库,不建议使用删除以及更新操作,因此不做介绍.
        InfluxDB influxDB = influxDbUtils.getInfluxDB(); //

//        LogInfo logInfo = LogInfo.builder()
//                .level(jsonObject.getString("level"))
//                .module(module)
//                .deviceId(deviceId)
//                .msg(jsonObject.getString("msg"))
//                .build();

//        以下为业务数据结构体，Tag,Field在结构体中加以区别，以单条日志保存为例

        String deviceId="123";
        LogInfo logInfo=LogInfo.builder()
                .level("normal")
                .module("waterMeter")
                .deviceId(deviceId)
                .msg("38.6").build();
        //--------

        //Point是标准的数据结构体，包括了measurement,tag,fields
        Point point = Point.measurementByPOJO(logInfo.getClass())
                .addFieldsFromPOJO(logInfo)
                .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
                .build();
        // 出于业务考量,设备可以设置不同的保存策略(策略名为固定前缀+设备ID)
        influxDB.write(influxDbUtils.database, influxDbUtils.DefaultPolicyName, point);
        //influxDB.write(influxDbUtils.database, InfluxDbUtils.policyNamePix + deviceId, point);
    }

    //查询
    @GetMapping("/queryPoints")
    public String  queryPoints(){
        // InfluxDB支持分页查询,因此可以设置分页查询条件
        int pageSize=10;
        int pageNum=1;

        String pageQuery = " LIMIT " + pageSize + " OFFSET " + (pageNum-1)*pageSize;
        // 此处查询所有内容,如果0
        //SELECT * FROM "logInfo" WHERE time > now() - 5m
        String queryCondition=" WHERE time > now() - 5m ";
        String queryCmd = "SELECT * FROM "
                // 查询指定设备下的日志信息
                // 要指定从 RetentionPolicyName(保存策略前缀+设备ID).measurement(logInfo) 中查询指定数据)
                + " \"logInfo\" "
                //+ InfluxDbUtils.policyNamePix + "123" + "." + "logInfo"
                // 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)
                + queryCondition
                // 查询结果需要按照时间排序
                + " ORDER BY time DESC"
                // 添加分页查询条件
                + pageQuery;

        InfluxDB influxDB = influxDbUtils.getInfluxDB();
        QueryResult result= influxDB.query(new Query(queryCmd,influxDbUtils.database));
        //influxDB.query(new Query("CREATE DATABASE " + database));
        return result.toString();
    }

    //批量插入
//    InfluxDB 批量发送消息有两种模式：定时定量（这个词等会具体解释）和BatchPoints
//    1、每次设置一条，满足一定条件之后（定时或者定量）插入数据库。
//    2、一次性设定多条数据插入数据库；BatchPoints是把多条数据同时设置进去，一次性发送请求。
//    对于InfluxDB的使用，肯定是大量的，请求密集，对性能要求很高。所以这个时候就要求数据的保存尽量使用异步完成。
//    同样InfluxDB也提供了这种功能，


    //批量写入方式一：BatchPoints 批量设置，一次性发送请求
    @PostMapping("/batchWritePoints")
    public void batchWritePoints(){
        InfluxDB influxDB = influxDbUtils.getInfluxDB();
        //创建默认策略
        //influxDbUtils.createDefaultRetentionPolicy();

        List<String> records = new ArrayList<String>();
        //随机数模拟实际值
        Random rand =new Random(25);
        int value=rand.nextInt(100);

        //模拟2块水表的数据插入一个数据表中
        for (int i = 0; i < 1; i++) {

            String deviceId="device"+i;
            BatchPoints batchPoints=BatchPoints.database(influxDbUtils.database)
                    .retentionPolicy(influxDbUtils.DefaultPolicyName)
                    .consistency(InfluxDB.ConsistencyLevel.ALL)
                    .build();

            //每个水表10000条记录
            for (int j = 0; j < 10000; j++) {
                //sleep
                try {
                    Thread.sleep(10);                 //1000 milliseconds is one second.
                } catch(InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }

                //radom value
                value=rand.nextInt(100);

                LogInfo logInfo=LogInfo.builder()
                        .level("normal")
                        .module("waterMeter")
                        .deviceId(deviceId)
                        .msg(""+j).build();

                Point point = Point.measurementByPOJO(logInfo.getClass())
                        .addFieldsFromPOJO(logInfo)
                        .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
                        .build();

                batchPoints.point(point);
            }
            records.add(batchPoints.lineProtocol());
            influxDB.write(influxDbUtils.database,influxDbUtils.DefaultPolicyName, InfluxDB.ConsistencyLevel.ALL,records);
        }
    }

    //批量写入方式二：每次设置一条，满足一定条件之后（定时或者定量）插入数据库。
    @PostMapping("/batchWritePoints2")
    public void batchWritePoints2(){
        InfluxDB influxDB = influxDbUtils.getInfluxDB();
        //创建默认策略
        //influxDbUtils.createDefaultRetentionPolicy();

        //随机数模拟实际值
        Random rand =new Random(25);
        int value=rand.nextInt(100);

        //模拟2块水表的数据插入一个数据表中
        for (int i = 3; i < 4; i++) {

            String deviceId="device"+i;
            BatchPoints batchPoints=BatchPoints.database(influxDbUtils.database)
                    .retentionPolicy(influxDbUtils.DefaultPolicyName)
                    .consistency(InfluxDB.ConsistencyLevel.ALL)
                    .build();

            //每个水表10000条记录
            for (int j = 0; j < 10000; j++) {
                //sleep
                try {
                    Thread.sleep(10);                 //1000 milliseconds is one second.
                } catch(InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }

                //radom value
                value=rand.nextInt(100);

                LogInfo logInfo=LogInfo.builder()
                        .level("normal")
                        .module("waterMeter")
                        .deviceId(deviceId)
                        .msg(""+j).build();

                Point point = Point.measurementByPOJO(logInfo.getClass())
                        .addFieldsFromPOJO(logInfo)
                        .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
                        .build();

                //由influxDB的enableBatch控制写请求条件
                // enableBatch这里第一个是point的个数，第二个是时间，单位毫秒，第三个时间单位一般设置成TimeUnit.MILLISECONDS就好，先不管。
                //point的个数和时间是联合使用的，如果满2000条或者10000毫秒，满足任何一个条件就会发送一次写的请求。
                // .enableBatch(2000,10000, TimeUnit.MILLISECONDS);

                influxDB.write(influxDbUtils.database, influxDbUtils.DefaultPolicyName, point);
            }

        }
    }
}
